package com.flink_demo.demo.eventtime.kafka_producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 消息生产者
 * 
 * @author xiaojf 2017/3/22 14:27
 */
public class MsgProducer {

	/**
	 * 消息发送后的回调函数
	 */
	static class MsgProducerCallback implements Callback {

		private final long startTime;
		private final String key;
		private final String msg;

		public MsgProducerCallback(long startTime, String key, String msg) {
			this.startTime = startTime;
			this.key = key;
			this.msg = msg;
		}

		public void onCompletion(RecordMetadata recordMetadata, Exception e) {
			long elapsedTime = System.currentTimeMillis() - startTime;
			if (recordMetadata != null) {
				System.out.println(msg + " be sended to partition no : " + recordMetadata.partition());
			}
		}
	}

	public static void main(String args[]) throws InterruptedException {
		Properties properties = new Properties();
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.200:9092");// broker
																						// 集群地址
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
				"org.apache.kafka.common.serialization.StringSerializer");// key
																			// 序列号方式
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
				"org.apache.kafka.common.serialization.StringSerializer");// value
																			// 序列号方式
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
		String topic = "event_time";
		boolean isAsync = false;
		for (long msgNo = 0; msgNo < 1000; msgNo++) {
			String key = msgNo + "";
			String msg = msgNo + "";
//			String msg = "0001,2,2017-10-01 12:00:28";
//			String msg = "0001,2,2017-10-01 12:01:00";
			Thread.sleep(300);
			if (isAsync) {// 异步
				producer.send(new ProducerRecord<String, String>(topic, msg));
				// producer.send(new ProducerRecord<String, String>(this.topic,
				// key, msg));
				System.out.println("发送异步数据:" + msg);
			} else {// 同步
				producer.send(new ProducerRecord<String, String>(topic, msg),
						new MsgProducerCallback(System.currentTimeMillis(), key, msg));
				System.out.println("发送同步数据:" + msg);
			}
		}
		producer.flush();
	}
}
